#define _LARGEFILE64_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <dirent.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <signal.h>

#include <mtcp_api.h>
#include <mtcp_epoll.h>

#include "cpu.h"
#include "http_parsing.h"
#include "debug.h"

#define MAX_FLOW_NUM  (10000)

#define RCVBUF_SIZE (2*1024)
#define SNDBUF_SIZE (8*1024)

#define MAX_EVENTS (MAX_FLOW_NUM * 3)

#define HTTP_HEADER_LEN 1024
#define URL_LEN 128
#define MAX_IP_STR_LEN 16

#define NUM_SLAVES 2
#define MAX_CPUS 16
#define MAX_FILES 30
#define LINK_TO_SLAVES 1

#define MAX_CONNECTIONS 8

#define MAX(a, b) ((a)>(b)?(a):(b))
#define MIN(a, b) ((a)<(b)?(a):(b))

#ifndef TRUE
#define TRUE (1)
#endif

#ifndef FALSE
#define FALSE (0)
#endif

#ifndef ERROR
#define ERROR (-1)
#endif

#define HT_SUPPORT FALSE

/*----------------------------------------------------------------------------*/
struct file_cache
{
	char name[128];
	char fullname[256];
	uint64_t size;
	char *file;
};
/*----------------------------------------------------------------------------*/
struct server_vars
{
	char request[HTTP_HEADER_LEN];
	int recv_len;
	int request_len;
	long int total_read, total_sent;
	uint8_t done;
	uint8_t rspheader_sent;
	uint8_t keep_alive;

	int fidx;						// file cache index
	char fname[128];				// file name
	long int fsize;					// file size
};
/*----------------------------------------------------------------------------*/
struct thread_context
{
	int core;
	mctx_t mctx;
	int ep;
	struct server_vars *svars;
	int s_sockid[MAX_CONNECTIONS];
    int s_slaveid[MAX_CONNECTIONS][NUM_SLAVES];
};
typedef struct thread_context* thread_context_t;
/*----------------------------------------------------------------------------*/
static int num_cores;
static int core_limit;
static int max_con;
static pthread_t app_thread[MAX_CPUS];
static int done[MAX_CPUS];
struct thread_context *g_ctx[MAX_CPUS];
/*----------------------------------------------------------------------------*/
const char *www_main;
static struct file_cache fcache[MAX_FILES];
static int nfiles;
/*----------------------------------------------------------------------------*/
static int finished;
/*----------------------------------------------------------------------------*/
/*memcached server*/
static char host[MAX_IP_STR_LEN + 1];
static in_addr_t daddr;
static in_port_t dport;
static in_addr_t saddr;
static int max_con;

const char *slave_ips[] = {"115.159.119.94","115.159.119.94"};
const int slave_ports[] = { 12222, 12333 };

/*----------------------------------------------------------------------------*/
void
CleanServerVariable(struct server_vars *sv)
{
	sv->recv_len = 0;
	sv->request_len = 0;
	sv->total_read = 0;
	sv->total_sent = 0;
	sv->done = 0;
	sv->rspheader_sent = 0;
	sv->keep_alive = 0;
}
/*----------------------------------------------------------------------------*/
void
CloseConnection(struct thread_context *ctx, int sockid, struct server_vars *sv)
{
	mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_DEL, sockid, NULL);
	mtcp_close(ctx->mctx, sockid);
}
/*----------------------------------------------------------------------------*/
static int
SendUntilAvailable(struct thread_context *ctx, int sockid, struct server_vars *sv)
{
	int ret;
	int sent;
	int len;

	if (sv->done || !sv->rspheader_sent) {
		return 0;
	}

	sent = 0;
	ret = 1;
	while (ret > 0) {
		len = MIN(SNDBUF_SIZE, sv->fsize - sv->total_sent);
		if (len <= 0) {
			break;
		}
		ret = mtcp_write(ctx->mctx, sockid,
			fcache[sv->fidx].file + sv->total_sent, len);
		if (ret < 0) {
			TRACE_APP("Connection closed with client.\n");
			break;
		}
		TRACE_APP("Socket %d: mtcp_write try: %d, ret: %d\n", sockid, len, ret);
		sent += ret;
		sv->total_sent += ret;
	}

	if (sv->total_sent >= fcache[sv->fidx].size) {
		struct mtcp_epoll_event ev;
		sv->done = TRUE;
		finished++;

		if (sv->keep_alive) {
			/* if keep-alive connection, wait for the incoming request */
			ev.events = MTCP_EPOLLIN;
			ev.data.sockid = sockid;
			mtcp_epoll_ctl(ctx->mctx, ctx->ep, MTCP_EPOLL_CTL_MOD, sockid, &ev);

			CleanServerVariable(sv);
		}
		else {
			/* else, close connection */
			CloseConnection(ctx, sockid, sv);
		}
	}

	return sent;
}

/*----------------------------------------------------------------------------*/
int
AcceptConnection(struct thread_context *ctx, int listener)
{
	mctx_t mctx = ctx->mctx;
	struct server_vars *sv;
	struct mtcp_epoll_event ev;
	int c;

	c = mtcp_accept(mctx, listener, NULL, NULL);

	if (c >= 0) {
		if (c >= MAX_FLOW_NUM) {
			TRACE_ERROR("Invalid socket id %d.\n", c);
			return -1;
		}

		sv = &ctx->svars[c];
		CleanServerVariable(sv);
		TRACE_APP("New connection %d accepted.\n", c);
		ev.events = MTCP_EPOLLIN;
		ev.data.sockid = c;
		mtcp_setsock_nonblock(ctx->mctx, c);
		mtcp_epoll_ctl(mctx, ctx->ep, MTCP_EPOLL_CTL_ADD, c, &ev);
		TRACE_APP("Socket %d registered.\n", c);

	}
	else {
		if (errno != EAGAIN) {
			TRACE_ERROR("mtcp_accept() error %s\n",
				strerror(errno));
		}
	}

	return c;
}
/*----------------------------------------------------------------------------*/
struct thread_context *
	InitializeServerThread(int core)
{
	struct thread_context *ctx;

	/* affinitize application thread to a CPU core */
#if HT_SUPPORT
	mtcp_core_affinitize(core + (num_cores / 2));
#else
	mtcp_core_affinitize(core);
#endif /* HT_SUPPORT */

	ctx = (struct thread_context *)calloc(1, sizeof(struct thread_context));
	if (!ctx) {
		TRACE_ERROR("Failed to create thread context!\n");
		return NULL;
	}

	/* create mtcp context: this will spawn an mtcp thread */
	ctx->mctx = mtcp_create_context(core);
	if (!ctx->mctx) {
		TRACE_ERROR("Failed to create mtcp context!\n");
		return NULL;
	}

	/* create epoll descriptor */
	ctx->ep = mtcp_epoll_create(ctx->mctx, MAX_EVENTS);
	if (ctx->ep < 0) {
		TRACE_ERROR("Failed to create epoll descriptor!\n");
		return NULL;
	}

	/* allocate memory for server variables */
	ctx->svars = (struct server_vars *)
		calloc(MAX_FLOW_NUM, sizeof(struct server_vars));
	if (!ctx->svars) {
		TRACE_ERROR("Failed to create server_vars struct!\n");
		return NULL;
	}
	
	ctx->core = core;

	return ctx;
}
/*----------------------------------------------------------------------------*/
int
CreateListeningSocket(struct thread_context *ctx)
{
	int listener;
	struct mtcp_epoll_event ev;
	struct sockaddr_in saddr;
	int ret;

	/* create socket and set it as nonblocking */
	listener = mtcp_socket(ctx->mctx, AF_INET, SOCK_STREAM, 0);
	if (listener < 0) {
		TRACE_ERROR("Failed to create listening socket!\n");
		return -1;
	}
	ret = mtcp_setsock_nonblock(ctx->mctx, listener);
	if (ret < 0) {
		TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
		return -1;
	}

	/* bind to port 80 */
	saddr.sin_family = AF_INET;
	saddr.sin_addr.s_addr = INADDR_ANY;
	saddr.sin_port = dport;
	ret = mtcp_bind(ctx->mctx, listener,
		(struct sockaddr *)&saddr, sizeof(struct sockaddr_in));
	if (ret < 0) {
		TRACE_ERROR("Failed to bind to the listening socket!\n");
		return -1;
	}

	/* listen (backlog: 4K) */
	ret = mtcp_listen(ctx->mctx, listener, 4096);
	if (ret < 0) {
		TRACE_ERROR("mtcp_listen() failed!\n");
		return -1;
	}

	/* wait for incoming accept events */

	return listener;
}


/* create connection to */
inline int
CreateConnection(thread_context_t ctx,  in_addr_t vdaddr, in_port_t vdport, int isslave, int group_id)
{
	mctx_t mctx = ctx->mctx;
	struct mtcp_epoll_event ev;
	struct sockaddr_in addr;
	int sockid;
	int ret;

	sockid = mtcp_socket(mctx, AF_INET, SOCK_STREAM, 0);
	if (sockid < 0) {
		TRACE_INFO("Failed to create socket!\n");
		return -1;
	}
	
	ret = mtcp_setsock_nonblock(mctx, sockid);
	if (ret < 0) {
		TRACE_ERROR("Failed to set socket in nonblocking mode.\n");
		exit(-1);
	}


    addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = vdaddr;
    addr.sin_port = vdport;

	ret = mtcp_connect(mctx, sockid,
        (struct sockaddr *)&addr, sizeof(struct sockaddr_in), isslave, group_id);
	if (ret < 0) {
		if (errno != EINPROGRESS) {
			TRACE_ERROR("Cannot connect to server.\n");
			mtcp_close(mctx, sockid);
			return -1;
		}
	}
	else {			
		TRACE_INFO("Server connected: sockid=%d.\n", sockid);
	}
		//printf("ret: %d, done core: %d\n", ret, !done[ctx->core]);
	//} while (ret < 0 && !done[ctx->core]);

	return sockid;
}

void *
RunTCPStack(void *arg)
{
	int core = *(int *)arg;
	struct thread_context *ctx;
	mctx_t mctx;
	int listener;
	int ep;
	struct mtcp_epoll_event *events;
	int nevents;
	int i, ret;
	int do_accept;
    int group_id = 1;

	/* initialization */
	ctx = InitializeServerThread(core);
	if (!ctx) {
		TRACE_ERROR("Failed to initialize server thread.\n");
		return NULL;
	}
	g_ctx[core] = ctx;
	mctx = ctx->mctx;
	ep = ctx->ep;

	events = (struct mtcp_epoll_event *)
		calloc(MAX_EVENTS, sizeof(struct mtcp_epoll_event));
	if (!events) {
		TRACE_ERROR("Failed to create event struct!\n");
		exit(-1);
	}

	listener = CreateListeningSocket(ctx);
	if (listener < 0) {
		TRACE_ERROR("Failed to create listening socket.\n");
		exit(-1);
	}
	
	i = 0;

#ifdef LINK_TO_SLAVES
    in_addr_t sdaddr[NUM_SLAVES];
    in_port_t sdport[NUM_SLAVES];
    int j = 0;
    for(j = 0; j < NUM_SLAVES; j++)
    {
        sdaddr[j] = inet_addr(slave_ips[j]);
        sdport[j] = htons(slave_ports[j]);
    }
#endif


	while (i < max_con && !done[core]) {
        g_ctx[core]->s_sockid[i] = CreateConnection(ctx, daddr, dport, S_LEADER, group_id);

#ifdef LINK_TO_SLAVES
        for(j=0; j<NUM_SLAVES; j++)
        {
             g_ctx[core]->s_slaveid[i][j] = CreateConnection(ctx, sdaddr[j], sdport[j], S_SLAVE, group_id);
        }
#endif
        group_id++;
		i++;
		/*
		if (g_ctx[core]->s_sockid[i] > 0) {
			i++;
		}
		*/
	}

	/* destroy mtcp context: this will kill the mtcp thread */
	//mtcp_wait_stack(mctx);
	//sleep(30);
	//mtcp_destroy_connections(mctx);
	mtcp_wait_stack(mctx);
	mtcp_destroy_context(mctx);

	pthread_exit(NULL);

	return NULL;
}
/*----------------------------------------------------------------------------*/
void
SignalHandler(int signum)
{
	int i, ret;
	//puts("Server SignalHandler SIGINT\n");
	for (i = 0; i < core_limit; i++) {
		//printf("mtcp_close: id:%d\n", g_ctx[i]->s_sockid);
		//ret = mtcp_close(g_ctx[i]->mctx, g_ctx[i]->s_sockid);
		//printf("mtcp_close: ret:%d\n", ret);
		
		if (app_thread[i] == pthread_self()) {
			TRACE_INFO("Server thread %d got SIGINT\n", i);
			mtcp_destroy_connections(g_ctx[i]->mctx);
			done[i] = TRUE;
		} else {
			if (!done[i]) {
				pthread_kill(app_thread[i], signum);
			}
		}
	}
}
/*----------------------------------------------------------------------------*/
int
main(int argc, char **argv)
{
	DIR *dir;
	struct dirent *ent;
	int fd;
	int ret;
	uint64_t total_read;
	struct mtcp_conf mcfg;
	int cores[MAX_CPUS];
	int i, batch_max_len;
	uint32_t  batch_interval = 0;

	num_cores = GetNumCPUs();

	if (argc < 2) {
		printf("Usage: [option] [parameter]\n\
			-s server ip\n\
			-p server port\n\
			-n number of cores\n\
			-i batch interval in us\n\
			-t batch size in Bytes\n");
		return FALSE;
	}

	/* open the directory to serve */

	daddr = inet_addr("10.0.3.25");
	dport = htons(6330);
	saddr = INADDR_ANY;

	for (i = 0; i < argc - 1; i++) {
		if (strcmp(argv[i], "-n") == 0) {
			core_limit = atoi(argv[i + 1]);
			core_limit = (core_limit > num_cores) ? num_cores : core_limit;
			if (core_limit > num_cores) {
				TRACE_CONFIG("CPU limit should be smaller than the "
					"number of CPUS: %d\n", num_cores);
			}
			/**
			* it is important that core limit is set
			* before mtcp_init() is called. You can
			* not set core_limit after mtcp_init()
			*/
			mtcp_getconf(&mcfg);
			mcfg.num_cores = core_limit;
			mtcp_setconf(&mcfg);
		}
		else if (strcmp(argv[i], "-s") == 0) {
			daddr = inet_addr(argv[i + 1]);
		}
		else if (strcmp(argv[i], "-p") == 0) {
			dport = htons(atoi(argv[i + 1]));
		}
		else if (strcmp(argv[i], "-i") == 0) {
			batch_interval = atoi(argv[i + 1]);
			mtcp_getconf(&mcfg);
			mcfg.batch_interval = batch_interval;
			mtcp_setconf(&mcfg);
		}
		else if (strcmp(argv[i], "-t") == 0) {
			batch_max_len = atoi(argv[i + 1]);
			mtcp_getconf(&mcfg);
			mcfg.batch_max_len = batch_max_len;
			mtcp_setconf(&mcfg);
		}
		else if (strcmp(argv[i], "-c") == 0) {
			max_con = atoi(argv[i + 1]);
			mtcp_getconf(&mcfg);
			mcfg.max_server = max_con;
			mtcp_setconf(&mcfg);
		}
	}
	finished = 0;

	/* initialize mtcp */
	ret = mtcp_init("epserver.conf");
	if (ret) {
		TRACE_ERROR("Failed to initialize mtcp\n");
		exit(EXIT_FAILURE);
	}

	/* register signal handler to mtcp */
	mtcp_register_signal(SIGINT, SignalHandler);

	TRACE_INFO("Application initialization finished.\n");

	for (i = 0; i < core_limit; i++) {
		cores[i] = i;
		done[i] = FALSE;

		if (pthread_create(&app_thread[i],
			NULL, RunTCPStack, (void *)&cores[i])) {
			perror("pthread_create");
			TRACE_ERROR("Failed to create server thread.\n");
			exit(-1);
		}
	}

	for (i = 0; i < core_limit; i++) {
		pthread_join(app_thread[i], NULL);
	}
	TRACE_INFO("App_thread joined.\n");

	mtcp_destroy();
	return 0;
}

